feat: kafka extended configuration for SASL auth & TLS#355
Conversation
|
bump @evanpurkhiser |
| pub struct KafkaConfig { | ||
| /// Kafka security protocol to use. The value must be one of "plaintext, "ssl", "sasl_plaintext", "sasl_ssl". | ||
| /// If not specified, defaults to "plaintext". | ||
| pub kafka_security_protocol: Option<String>, |
There was a problem hiding this comment.
If these are the only possible options, let's represent them as an enum, then.
| }, | ||
| results_kafka_cluster: vec!["127.0.0.1:9092".to_owned()], | ||
| results_kafka_topic: "uptime-results".to_owned(), | ||
| kafka_config: KafkaConfig { |
There was a problem hiding this comment.
Lets's derive a default for KafkaConfig, so that we can just use that here.
| pub results_kafka_topic: String, | ||
|
|
||
| /// Kafka extended configuration | ||
| #[serde(flatten)] |
There was a problem hiding this comment.
Should be able to use #[serde(flatten, with = "kafka_")], and then not have to have the kafka_ prefix on all those struct field names
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds extended Kafka producer configuration (TLS/SASL/security protocol) to make Kafka output work in secured clusters.
Changes:
- Introduces
KafkaConfig+KafkaSecurityProtocolin app config, plus env-var mappings/tests. - Wires new Kafka config fields into Kafka producer overrides (
ssl.*,sasl.*,security.protocol) whenProducerMode::Kafkais used.
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
src/manager.rs |
Builds Kafka override properties from new kafka_config fields (TLS/SASL/security protocol). |
src/app/config.rs |
Adds Kafka config types/defaults and expands config tests to cover new env vars. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if let Some(kafka_security_protocol) = &config.kafka_config.security_protocol | ||
| { | ||
| kafka_overrides.insert( | ||
| "security.protocol".to_string(), | ||
| kafka_security_protocol.to_string(), | ||
| ); | ||
| } |
| /// Kafka security protocol to use. The value must be one of "plaintext, "ssl", "sasl_plaintext", "sasl_ssl". | ||
| /// If not specified, defaults to "plaintext". |
| #[derive(PartialEq, Debug, Serialize, Deserialize)] | ||
| pub struct KafkaConfig { | ||
| /// Kafka security protocol to use. The value must be one of "plaintext, "ssl", "sasl_plaintext", "sasl_ssl". | ||
| /// If not specified, defaults to "plaintext". | ||
| pub security_protocol: Option<KafkaSecurityProtocol>, | ||
|
|
||
| /// TLS CA certificate location for Kafka. | ||
| pub ssl_ca_location: Option<String>, | ||
|
|
||
| /// TLS certificate location for Kafka. | ||
| pub ssl_cert_location: Option<String>, | ||
|
|
||
| /// TLS private key location for Kafka. | ||
| pub ssl_key_location: Option<String>, | ||
|
|
||
| /// SASL mechanism to use for Kafka. The value must be one of "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512". | ||
| pub sasl_mechanism: Option<String>, | ||
|
|
||
| /// SASL username for Kafka. | ||
| pub sasl_username: Option<String>, | ||
|
|
||
| /// SASL password for Kafka. | ||
| pub sasl_password: Option<String>, | ||
| } |
| impl Default for KafkaConfig { | ||
| fn default() -> Self { | ||
| Self { | ||
| security_protocol: None, | ||
| ssl_ca_location: None, | ||
| ssl_cert_location: None, | ||
| ssl_key_location: None, | ||
| sasl_mechanism: None, | ||
| sasl_username: None, | ||
| sasl_password: None, | ||
| } | ||
| } | ||
| } |
| if let Some(kafka_ssl_ca_location) = &config.kafka_config.ssl_ca_location { | ||
| kafka_overrides.insert( | ||
| "ssl.ca.location".to_string(), | ||
| kafka_ssl_ca_location.to_owned(), | ||
| ); | ||
| } | ||
|
|
||
| if let Some(kafka_ssl_cert_location) = &config.kafka_config.ssl_cert_location | ||
| { | ||
| kafka_overrides.insert( | ||
| "ssl.certificate.location".to_string(), | ||
| kafka_ssl_cert_location.to_owned(), | ||
| ); | ||
| } | ||
|
|
||
| if let Some(kafka_ssl_key_location) = &config.kafka_config.ssl_key_location { | ||
| kafka_overrides.insert( | ||
| "ssl.key.location".to_string(), | ||
| kafka_ssl_key_location.to_owned(), | ||
| ); | ||
| } | ||
|
|
||
| if let Some(kafka_security_protocol) = &config.kafka_config.security_protocol | ||
| { | ||
| kafka_overrides.insert( | ||
| "security.protocol".to_string(), | ||
| kafka_security_protocol.to_string(), | ||
| ); | ||
| } | ||
|
|
||
| if let Some(kafka_sasl_mechanism) = &config.kafka_config.sasl_mechanism { | ||
| kafka_overrides.insert( | ||
| "sasl.mechanism".to_string(), | ||
| kafka_sasl_mechanism.to_owned(), | ||
| ); | ||
| } | ||
|
|
||
| if let Some(kafka_sasl_username) = &config.kafka_config.sasl_username { | ||
| kafka_overrides | ||
| .insert("sasl.username".to_string(), kafka_sasl_username.to_owned()); | ||
| } | ||
|
|
||
| if let Some(kafka_sasl_password) = &config.kafka_config.sasl_password { | ||
| kafka_overrides | ||
| .insert("sasl.password".to_string(), kafka_sasl_password.to_owned()); | ||
| } |
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 111767d. Configure here.

No description provided.